-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NUTCH-2793 indexer-csv: make it work in distributed mode #534
base: master
Are you sure you want to change the base?
Conversation
Before the change, the output file name was hard-coded to "nutch.csv". When running in distributed mode, multiple reducers would clobber each other output. After the change, the filename is taken from the first open(cfg, name) initialization call, where name is a unique file name generated by IndexerOutputFormat, derived from hadoop FileOutputFormat. The CSV files are now named like part-r-000xx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @pmezard! The indexer-csv was initially thought more as a debugging tool and eventually a quick export utility in local mode only (CSV is not really a format for big data). To lift the limitation to local mode would require a couple of substantial changes to the IndexWriter interface, esp. if we want to reliably allow for any filesystems supported by Hadoop. You may have a look on this description of the committer architecture to get an insight into the requirements of atomic commits etc.
@@ -192,7 +189,7 @@ protected int find(String value, int start) { | |||
|
|||
@Override | |||
public void open(Configuration conf, String name) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is deprecated since the switch to the XML-based index writer configuration (see NUTCH-1480 and the wiki page IndexWriters). "name" was just an arbitrary name not a file name indicating a task-specific output path. We would need a method which takes both: the IndexWriterParams and the output path. This would require changes in the IndexWriter interface and also the classes IndexWriters and IndexerMapReduce. I'm also not sure whether the output path alone is sufficient. We'll eventually need an OutputCommitter and need to think about situations if we have multiple index writers (eg. via exchanges). See also the discussion in NUTCH-1541.
outpath | Output path / directory (local filesystem path, relative to current working directory) | csvindexwriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still "local filesystem"? Ev. we could the outpath to overcome the problem of multiple index writers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I did not understand that, could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I've mixed two points mixed together:
- the description would also need a change as it will not be a path on the local filesystem if running in distributed mode
- there is also the open question how to allow two index writers writing output the filesystem:
- in local mode this would require that the
outpath
points to a different directory - in distributed mode we could use
outpath
to write into distinct output directories or distinct subdirectories of one job-specific output directory
- in local mode this would require that the
What are the backward compatibility requirements for nutch? Is it OK to just change the interface and implement what you suggest? Should it be best-effort to keep things BC? Or is it impossible to implement such a change at this point? |
Yes, that's ok. We'll put a notice about a breaking change to the release notes, so that users having there own indexer plugin know they have to adapt it.
We could try to only extend the IndexWriter interface and provide default do-nothing implementations for newly added methods as most index writers do not write data to the filesystem. |
OK, there is a lot to unpack. Let me try to rephrase what was my naive understanding of the issue, how I intended to fix it and what is wrong about it. What I saw is indexing to csv worked locally but failed in a distributed setup (with only 3 nodes). The reduce step emitted errors when writing data to GCS. At the end, there was something containing roughly a third of the expected dataset. I assumed I had 3 reducers overwriting each other with only one winner at the end (or a mix of winning output blocks). So I thought "if only I could map the CSVIndexWriter output file to a reducer to separate each reducer output, that would solve the issue". What you are saying is:
With this current understanding, I would now implement it like:
How far am I? |
Thanks for the exhaustive listing. I have only a few points to add.
The MapReduce framework takes care of data serialization and concurrency issues: the reduce() method is never called concurrently within one task - tasks run in parallel and that's why every task needs it's own output (part-r-nnnnn). The name of the output file (the number in n) is also determined by the framework - that's important if a task is restarted to avoid duplicated output.
I think we need 3 components:
In short, the path of a task output might look like:
You mean [ParseOutputFormat::getUniqueFile](https://github.com/apache/nutch/blob/59d0d9532abdac409e123ab103a506cfb0df790a/src/java/org/apache/nutch/parse/ParseOutputFormat.java#L120]? ParseOutputFormat or FetcherOutputFormat are good examples as they write output into multiple segment subdirectories. Hence, there are no plugins involved which determine whether there is output written to the filesystem or not.
That could be done using default method implementations in Java 8 interfaces. Note: Nutch requires now Java 8 but it started with Java 1.4 and there is still a lot of code not using features of Java 8. Also, to keep the indexer usable (because most index writers (solr, elasticsearch, etc.) do not write output to the filesystem): if nothing is written to the filesystem IndexingJob should not require an output location as command-line argument. |
Thank you for the details. One thing I wonder is if it would not be possible to define the index-writers specific path as their identifier in index-writers.xml, at least by default. It would be unique by construction, which reduces a bit the amount of configuration. Drawbacks:
|
Yes, we could use the identifier but as we already have the param "outpath" - why not use it? The other constraints should be documented. |
Before the change, the output file name was hard-coded to "nutch.csv".
When running in distributed mode, multiple reducers would clobber each
other output.
After the change, the filename is taken from the first open(cfg, name)
initialization call, where name is a unique file name generated by
IndexerOutputFormat, derived from hadoop FileOutputFormat. The CSV files
are now named like part-r-000xx.